package com.qiudao.rabbitmq.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * Description: 主题交换机 ，消费者接收队列信息
 * @author: gdc
 * @date: 2019/11/23
 * @version 1.0
 */
@Component
public class TopicReceiver {

    /**
     *  \@RabbitListener：监听的队列
     *  \@RabbitHandler：但@RabbitListener注解在类上时，需要使用@RabbitHandler来指明调用的方法。
     *  \void basicAck(long deliveryTag, boolean multiple) throws IOException; 确认消息接收。
     *          deliveryTag:该消息的index，
     *          multiple：是否批量.true:将一次性ack所有小于deliveryTag的消息。
     *   channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 拒绝消息接收
     *          deliveryTag:该消息的index，
     *          multiple：是否批量.true:将一次性拒绝所有小于deliveryTag的消息，
     *          requeue：被拒绝的是否重新入队列。
     *   channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 拒绝消息接收。
     *          deliveryTag:该消息的index，
     *          requeue：被拒绝的是否重新入队列，
     *          channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息，而basicReject一次只能拒绝一条消息。
     *   chanel.basicQos(int prefetchSize, int prefetchCount, boolean global) 消息限流的功能，防止生产过多，导致消费者消费吃力的情况;
     *          prefetchSize： 0表示对消息的大小无限制,单位为(B-字节)
     *          prefetchCount：会告诉RabbitMQ不要同时给一个消费者推送多于N个消息，即一旦有N个消息还没有ack，则该consumer将 阻塞 掉，直到有消息ack。0为无上限
     *          global：true\false 是否将上面设置应用于channel，简单点说，就是上面限制是channel级别的还是consumer级别。
     */
//    @RabbitHandler
    @RabbitListener(queues = "topic.man")
    public void processMan(Map info, Channel channel, Message message) {
        try {
            channel.basicQos(0, 1, false);
            System.out.println("TopicManReceiver消费者接收消息： " + info.toString());
            // 消息确认接收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            try {
                // 消息拒绝接收，此处需要考虑，消息一直循环失败的问题
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }


    @RabbitListener(queues = "topic.woman")
    public void processWoman(Map info, Channel channel, Message message) {
        try {
            channel.basicQos(0, 1, false);
            System.out.println("TopicWomanReceiver消费者接收的消息为： " + info.toString());
            // 消息确认接收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            try {
                // 消息拒绝接收
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }

    }
}
